package com.shujia.flink.tf

import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala._

object Demo4KeyBY {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(2)

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))

    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))

    /**
      * keyBy 将相同的key 发送到同一个task中
      *
      */
    //scala api
    //kvDS.keyBy(kv => kv._1).print()


    //java api
    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(new KeySelector[(String, Int), String] {
      override def getKey(value: (String, Int)): String = {
        value._1
      }
    })

    keyByDS.print()

    env.execute()


  }

}
